{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Demonstrate Data-Prep-kit transforms as LangChain or llama-index tools"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### This notebook is based on Data Prep Kit Demo\n",
    "link: https://github.com/data-prep-kit/data-prep-kit/blob/v0.2.3/examples/notebooks/intro/dpk_intro_1_ray.ipynb\n",
    "\n",
    "![](https://raw.githubusercontent.com/data-prep-kit/data-prep-kit/v0.2.3/examples/notebooks/intro/images/data-prep-kit-3-workflow.png)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Install dependencies. This can take some time"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%pip install -qq -r requirements.txt"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%pip install -qq -r dpk-requirements.txt"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "! cd llm_utils/dpk/llama_index_tools/llama_index_tools_dpk && pip install -qq -e ."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%pip install -qq llama-index"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Use langchain or llama-index"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Set to True to define DPK transforms as langchain tools; otherwise they will be defined as llama-index tools\n",
    "define_dpk_as_langchain_tools=False"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Define the input task"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Set to True to execute the transforms on the local Ray cluster; otherwise, the Python implementation is used.\n",
    "run_with_local_ray=False"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "ray_text=\"\"\n",
    "if run_with_local_ray:\n",
    "    ray_text=\"on a local ray cluster \"\n",
    "\n",
    "task=f\"Execute pdf2parquet, doc_chunk, doc_id, ededup, text_encoder transforms {ray_text} one after the other where the input to a transform is the output of the previous transform run.\""
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Set input/output paths"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import shutil\n",
    "import os\n",
    "cwd = os.getcwd()\n",
    "\n",
    "output_base_path = f\"{cwd}/output\"\n",
    "\n",
    "input_folder = f\"{cwd}/test-data/input/\"\n",
    "output_folder =  f\"{output_base_path}/final_1/\"\n",
    "\n",
    "shutil.rmtree(output_base_path, ignore_errors=True)\n",
    "print (f\"✅ Cleared {output_base_path} directory\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Set transforms parameters"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import json\n",
    "\n",
    "def prepare_params(params: dict):\n",
    "    params_json=json.dumps(params)\n",
    "    # trim clurly braces\n",
    "    return params_json[1:-1]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from data_processing.utils import GB, ParamsUtils\n",
    "\n",
    "pdf2parquet_params_dict={\"data_files_to_use\": \"['.pdf']\", \"input_folder\":input_folder,  \"pdf2parquet_contents_type\": \"application/json\"}\n",
    "doc_chunk_params_dict={}\n",
    "doc_id_params_dict={\"doc_id_hash_column\": \"chunk_hash\", \"doc_id_int_column\": \"chunk_id\"}\n",
    "ededup_params_dict={\"ededup_doc_column\": \"contents\", \"ededup_doc_id_column\": \"chunk_hash\"}\n",
    "text_encoder_params_dict={\"text_encoder_model_name\": \"sentence-transformers/all-MiniLM-L6-v2\", \"output_folder\":output_folder}"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "if run_with_local_ray:\n",
    "    worker_options_str=ParamsUtils.convert_to_ast({\"num_cpus\" : 0.8, \"memory\": 2 * GB})\n",
    "    ededup_params_dict=ededup_params_dict|{\"ededup_hash_cpu\": 0.5, \n",
    "                    \"ededup_num_hashes\": 2,\n",
    "                    \"runtime_worker_options\": worker_options_str,\n",
    "                    \"runtime_num_workers\": 2}\n",
    "    \n",
    "pdf2parquet_params=prepare_params(pdf2parquet_params_dict)\n",
    "doc_chunk_params=prepare_params(doc_chunk_params_dict)\n",
    "doc_id_params=prepare_params(doc_id_params_dict)\n",
    "ededup_params=prepare_params(ededup_params_dict)\n",
    "text_encoder_params=prepare_params(text_encoder_params_dict)\n",
    "\n",
    "params=f\"for pdf2parquet params use {pdf2parquet_params}. for doc_id use params {doc_id_params}. for ededup use params {ededup_params}. for text_encoder use params {text_encoder_params}\"\n",
    "input=f\"{task} {params}\""
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Print input task"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from IPython.display import HTML\n",
    "\n",
    "print_task=f\"<p><span style='color:blue; font-weight:bold; font-size:14.0pt;'>TASK: {task}</span></p>\"\n",
    "print_pdf2parquet=f\"<p><span style='white-space: pre-wrap;color:green; ;font-size:10pt;'>PDF2PARQUET Params: {pdf2parquet_params}</span></p> \"\n",
    "print_doc_chunks=f\"<p><span style='white-space: pre-wrap;color:green; ;font-size:10pt;'>DOC CHUNKS Params: {doc_chunk_params}</span></p> \"\n",
    "print_doc_id_params=f\"<p><span style='white-space: pre-wrap;color:green; ;font-size:10pt;'>DOC_ID Params: {doc_id_params}</span> </p>\"\n",
    "print_ededup_params=f\"<p><span style='color:green; ;font-size:10pt;'>EDEDUP Params: {ededup_params}</span></p> \"\n",
    "print_text_encoder_params=f\"<p><span style='color:green; ;font-size:10pt;'>TEXT_ENCODER Params: {text_encoder_params}</span></p> \"\n",
    "\n",
    "HTML(f\"{print_task}{print_pdf2parquet}{print_doc_chunks}{print_doc_id_params}{print_ededup_params}{print_text_encoder_params}</span>\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Define LLM models and tools\n",
    "\n",
    "We have have tested our project with the following LLM execution frameworks: [Watsonx](https://www.ibm.com/watsonx), [Replicate](https://replicate.com/), and locally running [Ollama](https://ollama.com/).\n",
    "To use one of the frameworks uncomment its part in the cell below while commenting out the other frameworks.\n",
    "Please note that the notebooks have been tested with specific Large Language Models (LLMs) that are mentioned in the cell, and due to the inherent nature of LLMs, using a different model may not produce the same results.\n",
    "\n",
    "- To use Replicate:\n",
    "  - Obtain Replicate API token\n",
    "  - Store the following value in the `.env` file located in your project directory:\n",
    "    ```\n",
    "        REPLICATE_API_TOKEN=<your Replicate API token>\n",
    "    ```\n",
    "- To use Ollama: \n",
    "  - Download [Ollama](https://ollama.com/download).\n",
    "  - Download one of the supported [models](https://ollama.com/search). We tested with `llama3.3` model.\n",
    "  - update the `model_ollama_*` names if needed.\n",
    "- To use Watsonx:\n",
    "  - Register for Watsonx\n",
    "  - Obtain its API key\n",
    "  - Store the following values in the `.env` file located in your project directory:\n",
    "    ```\n",
    "        WATSONX_URL=<WatsonX entry point, e.g. https://us-south.ml.cloud.ibm.com>\n",
    "        WATSON_PROJECT_ID=<your Watsonx project ID>\n",
    "        WATSONX_APIKEY=<your Watsonx API key>\n",
    "    ```"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%load_ext autoreload\n",
    "%autoreload 2\n",
    "\n",
    "from dotenv import dotenv_values\n",
    "config = dotenv_values(\".env\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import os\n",
    "from llm_utils.logging import prep_loggers\n",
    "\n",
    "os.environ[\"LLM_LOG_PATH\"] = \"./logs/llm_log.txt\"\n",
    "os.environ[\"TOOL_CALLING_LOG_PATH\"] = \"./logs/tool_log.txt\"\n",
    "prep_loggers(\"llm=INFO,tool_calling=INFO\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from llm_utils.models import getChatLLM\n",
    "### Define the model\n",
    "\n",
    "config = dotenv_values(\"./.env\")\n",
    "# watsonx part \n",
    "\n",
    "# model_watsonx_id = \"meta-llama/llama-3-1-70b-instruct\"\n",
    "# llm = getChatLLM(\"watsonx\", model_watsonx_id, config)\n",
    "\n",
    "# # ollama part\n",
    "# model_ollama = \"llama3.3\"\n",
    "# llm = getChatLLM(\"ollama\", model_ollama)\n",
    "\n",
    "# replicate part\n",
    "# You can use different llm models\n",
    "model_replicate = \"meta/meta-llama-3-70b-instruct\"\n",
    "llm = getChatLLM(\"replicate\", model_replicate, config)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### List DPK transforms"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "if define_dpk_as_langchain_tools:\n",
    "    from llm_utils.dpk.langchain_tools.agent_toolkit.toolkit import DataPrepKitToolkit\n",
    "    \n",
    "    toolkit = DataPrepKitToolkit()  \n",
    "    tools = toolkit.get_tools()\n",
    "    print(\"-- DPK tools: --\")\n",
    "    print(tools)\n",
    "else:\n",
    "    from llm_utils.dpk.llama_index_tools.llama_index_tools_dpk.llama_index_dpk.tools.dpk.base import DPKTransformsToolSpec\n",
    "    \n",
    "    dpk_spec = DPKTransformsToolSpec()\n",
    "    tools = dpk_spec.to_tool_list()\n",
    "    print(\"-- DPK tools: --\")\n",
    "    for t in tools:\n",
    "        print(t.metadata.name)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "if define_dpk_as_langchain_tools:\n",
    "    from langchain.tools import Tool\n",
    "    from typing import Union, List\n",
    "    \n",
    "    def find_tool_by_name(tools: List[Tool], tool_name: str) -> Tool:\n",
    "        for tool in tools:\n",
    "            if tool.name == tool_name:\n",
    "                return tool\n",
    "        raise ValueError(f\"Tool with name {tool_name} not found\")\n",
    "else:\n",
    "    from llama_index.core.tools import FunctionTool\n",
    "    from typing import Union, List\n",
    "    \n",
    "    def find_tool_by_name(tools: List[FunctionTool], tool_name: str) -> FunctionTool:\n",
    "        for tool in tools:\n",
    "            if tool.metadata.name == tool_name:\n",
    "                return tool\n",
    "        raise ValueError(f\"Tool with name {tool_name} not found\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "if define_dpk_as_langchain_tools:\n",
    "    from langchain.tools.render import render_text_description\n",
    "    \n",
    "    tools_str = render_text_description(tools)\n",
    "    tool_names = \", \".join([t.name for t in tools]),\n",
    "else:\n",
    "    tools_str = '\\n'.join(dpk_spec.spec_functions)\n",
    "    tool_names=\", \".join(dpk_spec.spec_functions)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Define the Prompt"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from langchain.prompts import PromptTemplate\n",
    "from langchain.tools.render import render_text_description\n",
    "from langchain_core.prompts import ChatPromptTemplate\n",
    "\n",
    "\n",
    "prompt_template = ChatPromptTemplate.from_template( \"\"\"Answer the following questions as best you can. You have access to the following tools:\n",
    "\n",
    "    {tools}\n",
    "    \n",
    "    Use the following format:\n",
    "    \n",
    "    Question: the input question you must answer\n",
    "    Thought: you should always think about what to do\n",
    "    Action: the action to take, should be one of [{tool_names}]\n",
    "    Action Input: the input to the action\n",
    "    Observation: the result of the action\n",
    "    ... (this Thought/Action/Action Input/Observation can repeat N times)\n",
    "    Thought: I now know the final answer\n",
    "    Final Answer: the final answer to the original input question\n",
    "\n",
    "    Final Answer or Action should appear in the answer but not both.\n",
    "    Follow the exact Action Input format provided in the examples when crafting your response.\n",
    "    Avoid numbering the output.\n",
    "\n",
    "    Here's an example.\n",
    "\n",
    "    For example, If the required task was to execute ededup , doc_id transforms one after the other. \n",
    "    The output directory of a transform is the input for the next transform in the transform order. \n",
    "    for ededup params use: 'input_folder':'/home/user/input/ededup'\n",
    "    for doc_id params use : 'output_folder':'/home/user/output/final'. \n",
    "    The output should be the following:\n",
    "      \n",
    "    Thought: I need to execute the ededup and doc_id one after the other.\n",
    "    \n",
    "    Action: ededup\n",
    "    Action Input: \"input_folder\":\"/home/user/input/ededup\", \"output_folder\":\"/home/user/output/ededup\"\n",
    "    Observation: The output of the ededup transform is stored in \"/home/user/output/ededup\".\n",
    "\n",
    "    Action: doc_id\n",
    "    Action Input: \"input_folder\":\"/home/user/output/ededup\", \"output_folder\":\"/home/user/output/final\"\n",
    "    Observation: The output of the doc_id transform is stored in \"/home/user/output/final\".\n",
    "\n",
    "    Here's another example: \n",
    "\n",
    "    If the required task was to execute ededup , doc_id transforms on a local ray cluster one after the other. \n",
    "    The output directory of a transform is the input for the next transform in the transform order. \n",
    "    for ededup params use: 'input_folder':'/home/user/input/ededup'\n",
    "    for doc_id params use : 'output_folder':'/home/user/output/final'\n",
    "    The output should be the following:\n",
    "      \n",
    "    Thought: I need to execute the ededup and doc_id one after the other.\n",
    "    \n",
    "    Action: ededup\n",
    "    Action Input: \"runtime_type\": \"ray\", \"run_locally\": \"True\", \"input_folder\":\"/home/user/input/ededup\", \"output_folder\":\"/home/user/output/ededup\"\n",
    "    Observation: The output of the ededup transform is stored in \"/home/user/output/ededup\".\n",
    "\n",
    "    Action: doc_id\n",
    "    Action Input: \"runtime_type\": \"ray\", \"run_locally\": \"True\", \"input_folder\":\"/home/user/output/ededup\", \"output_folder\":\"/home/user/output/final\"\n",
    "    Observation: The output of the doc_id transform is stored in \"/home/user/output/final\".\n",
    "\n",
    "    \n",
    "    Begin!\n",
    "    \n",
    "    Question: {input}\n",
    "    \"\"\")\n",
    "\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Invoke the agent to create the plan"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from langchain.prompts import PromptTemplate\n",
    "print(input)\n",
    "\n",
    "agent = prompt_template | llm \n",
    "\n",
    "agent_step = \"\"\n",
    "agent_step = agent.invoke(\n",
    "            {\n",
    "                \"input\": input,\n",
    "                \"tool_names\": tool_names,\n",
    "                \"tools\": tools_str,\n",
    "            }\n",
    "        )\n",
    "   \n",
    "print(agent_step.content)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Parse the plan"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "content = agent_step.content\n",
    "if type(content) == list:\n",
    "    content = ''.join(content)\n",
    "    print(content)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import re\n",
    "\n",
    "regex = (r\"Action\\s*\\d*\\s*:[\\s]*(.*?)[\\s]*Action\\s*\\d*\\s*Input\\s*\\d*\\s*:[\\s]*(.*)\")\n",
    "matches = re.findall(regex, content)\n",
    "\n",
    "print(\"LLM result contain the following transforms:\\n\")\n",
    "for match in matches:\n",
    "    print(f\"TRANSFORM NAME {match[0]}\")\n",
    "    print(f\"TRANSFORM PARAMS {match[1]}\")\n",
    "    print(\"--------------------------------------\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import json\n",
    "from typing import Any\n",
    "\n",
    "def load_from_json(js: str) -> dict[str, Any]:\n",
    "        try:\n",
    "            return json.loads(js)\n",
    "        except Exception as e:\n",
    "            print(f\"Failed to load parameters {js} with error {e}\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Execute the transfoms by calling their tool definition"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def run_tool(match) -> str:\n",
    "    def contains_parquet_files(dir_path):\n",
    "      return any(file.endswith(\".parquet\") for file in os.listdir(dir_path) if os.path.isfile(os.path.join(dir_path, file)))\n",
    "\n",
    "    tool_name = match[0]\n",
    "    tool_to_use = find_tool_by_name(tools, tool_name)\n",
    "    tool_name = match[0]\n",
    "    tool_input=\"{\"+match[1]+\"}\"\n",
    "    tool_input_dict = load_from_json(tool_input)\n",
    "    print(\"=======================================================\")\n",
    "    print (f\"🏃🏼 RUNNING {tool_name} with params: {tool_input_dict}\")\n",
    "    print(\"=======================================================\")\n",
    "    if define_dpk_as_langchain_tools:\n",
    "        tool_result  = tool_to_use.run(tool_input_dict)\n",
    "    else:\n",
    "        tool_result  = tool_to_use.call(**tool_input_dict)\n",
    "    if not contains_parquet_files(tool_input_dict[\"output_folder\"]):\n",
    "        out_dir=tool_input_dict[\"output_folder\"]\n",
    "        raise Exception (f\"The {out_dir} directory is unexpectedly empty, indicating the job failed.\")\n",
    "    print (f\"✅ {tool_result}\")\n",
    "    \n",
    "    return tool_result"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import time\n",
    "error=False\n",
    "for match in matches:\n",
    "    try:\n",
    "        tool_result = run_tool(match)\n",
    "        time.sleep(10)\n",
    "    except Exception as e:\n",
    "            error=True\n",
    "            print(f\"❌ Error: \" + str(e))\n",
    "            break\n",
    "\n",
    "if not error:\n",
    "    print(\"=================================================\")\n",
    "    print (f\"✅ Transforms execution completed successfully\")\n",
    "    print(\"=================================================\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Inspect Generated Output File"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "You will see a column called embeddings added at the end. This the text content converted into vectors or embeddings. \n",
    "We used the model sentence-transformers/all-MiniLM-L6-v2"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import glob\n",
    "import pandas as pd\n",
    "\n",
    "def read_parquet_files_as_df (parquet_dir):\n",
    "    parquet_files = glob.glob(f'{parquet_dir}/*.parquet')\n",
    "\n",
    "    # read each parquet file into a DataFrame and store in a list\n",
    "    dfs = [pd.read_parquet (f) for f in parquet_files]\n",
    "\n",
    "    # Concatenate all DataFrames into a single DataFrame\n",
    "    data_df = pd.concat(dfs, ignore_index=True)\n",
    "    return data_df"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# print the last transform output\n",
    "last_transform=matches[-1]\n",
    "tool_input=\"{\"+match[1]+\"}\"\n",
    "tool_input_dict = load_from_json(tool_input)\n",
    "dir=tool_input_dict[\"output_folder\"]\n",
    "print(dir)\n",
    "output_df = read_parquet_files_as_df(dir)\n",
    "\n",
    "print (\"Output data dimensions (rows x columns)= \", output_df.shape)\n",
    "\n",
    "output_df.head(2)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "venv",
   "language": "python",
   "name": "venv"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.11.9"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
